热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

这个名字|媒介_SpringCloudStream消息驱动设计思想以及整合rabbitmq消息队列案例学习笔记

篇首语:本文由编程笔记#小编为大家整理,主要介绍了SpringCloudStream消息驱动设计思想以及整合rabbitmq消息队列案例--学习笔记相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了SpringCloud Stream消息驱动设计思想以及整合rabbitmq消息队列案例--学习笔记相关的知识,希望对你有一定的参考价值。



一,RabbitMQ的安装和配置 并启动

参见 RabbitMQ的安装和配置https://blog.csdn.net/weixin_43025151/article/details/123186641
RabbitMQ启动成功:


二,SpringCloud Stream消息驱动


1,设计思想

一个标准的消息队列MQ,如下图:

为什么用CloudStream?


一句话:CloudStream屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型!




通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。

绑定器Binder:INPUT适用于消费者 OUTPUT适用于生产者
Stream中的消息通讯方式遵循了发布-订阅模式 Topic主题进行广播 在RabbitMQ就是Exchange 在Kafka中就是Topic。


2,Springcloud Stream标准流程套路


Binder:很方便的连接中间件,屏蔽差异


Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置


Source和Sink:简单的可理解为参照对象是Springcloud


Stream自身,从Stream发布消息就是输出,接受消息就是输入


3,编码API和常用注解


三,详细案例(整合rabbitmq消息队列)


1,新建父工程 springcloud-tigerhhzz

pom文件代码


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.tigerhhzz.springcloud-tigerhhzz
springcloud-tigerhhzz
pom
1.0-SNAPSHOT

cloud-api-commons
cloud-eureka-server7001
cloud-eureka-server7002
cloud-comsumer-order80
cloud-provider-payment8001
cloud-provider-payment8002
cloud-comsumer-feign-order80
cloud-provider-hystrix-payment8001
cloud-comsumer-feign-hystrix-order80
cloud-comsumer-hystrix-dashboard9001
cloud-gateway-gateway9527
cloud-config-center-3344
cloud-config-client-3355
cloud-config-client-3366
cloud-stream-rabbitmq-provider8801
cloud-stream-rabbitmq-consumer8802
cloud-stream-rabbitmq-consumer8803


UTF-8
1.8
1.8
4.12
1.2.17
1.18.0
<mysql.version>5.1.47
1.1.16
1.3.2






org.springframework.boot
spring-boot-dependencies
2.2.2.RELEASE
pom
import



org.springframework.cloud
spring-cloud-dependencies
Hoxton.SR1
pom
import



com.alibaba.cloud
spring-cloud-alibaba-dependencies
2.1.0.RELEASE
pom
import


mysql
mysql-connector-java
$mysql.version


com.alibaba
druid
$druid.version


org.mybatis.spring.boot
mybatis-spring-boot-starter
$mybatis.spring.boot.version


log4j
log4j
$log4j.version


junit
junit
$junit.version


org.projectlombok
lombok
$lombok.version
true





2&#xff0c;新建注册中心模块&#xff08;本文使用erueka作为注册中心&#xff09;

cloud-eureka-server7001
项目结构&#xff1a;

项目main入口方法&#xff1a;

package com.tigerhhzz.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
/**
* &#64;author tigerhhzz
* &#64;date 2022/6/13 10:33
*/
&#64;SpringBootApplication
&#64;EnableEurekaServer
public class EurekaMain7001
public static void main(String[] args)
SpringApplication.run(EurekaMain7001.class,args);

pom.xml


xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

springcloud-tigerhhzz
com.tigerhhzz.springcloud-tigerhhzz
1.0-SNAPSHOT

4.0.0
cloud-eureka-server7001


org.springframework.cloud
spring-cloud-starter-netflix-eureka-server


org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-actuator


com.tigerhhzz.springcloud-tigerhhzz
cloud-api-commons
1.0-SNAPSHOT



application.yml

server:
port: 7001
spring:
application:
name: cloud-eureka-server7001
eureka:
instance:
hostname: eureka7001.com
client:
register-with-eureka: false # 表示不向注册中心注册
fetch-registry: false # 由于注册中心的职责就是维护服务实例&#xff0c;所以它不需要去检索服务
service-url:
#defaultZone: http://eureka7002.com:7002/eureka/ #集群 指向其他eureka
defaultZone: http://eureka7001.com:7001/eureka/ #单机 指向自己
server:
enable-self-preservation: false #关闭自我保护机制 &#xff0c;保证不可用服务被及时剔除

3&#xff0c;新建三个子模块


cloud-stream-rabbitmq-provider8801&#xff0c;作为生产者进行消息模块


cloud-stream-rabbitmq-consumer8802&#xff0c;作为消费者接受模块


cloud-stream-rabbitmq-consumer8803&#xff0c;作为消费者接受模块

目录结构&#xff1a;
module:cloud-stream-rabbitmq-provider8801
pom.xml


xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

springcloud-tigerhhzz
com.tigerhhzz.springcloud-tigerhhzz
1.0-SNAPSHOT

4.0.0
cloud-stream-rabbitmq-provider8801


org.springframework.cloud
spring-cloud-starter-stream-rabbit


org.springframework.cloud
spring-cloud-starter-netflix-eureka-client


org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-actuator


org.springframework.boot
spring-boot-starter-test
test


com.tigerhhzz.springcloud-tigerhhzz
cloud-api-commons
1.0-SNAPSHOT



application.yml

server:
port: 8801
spring:
application:
name: cloud-stream-privider
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称&#xff0c;用于binding整合
type: rabbit #消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的exchange名称定义
content-type: application/json #设置消息类型&#xff0c;本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/ #http://localhost:7001/eureka/
instance:
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
instance-id: send-8801.com
prefer-ip-address: true #访问的路径变为IP地址

主启动类

package com.tigerhhzz.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* &#64;author tigerhhzz
* &#64;date 2022/6/18 10:00
*/
&#64;SpringBootApplication
public class StreamMQMain8801
public static void main(String[] args)
SpringApplication.run(StreamMQMain8801.class,args);

业务类–接口

package com.tigerhhzz.springcloud.service;
/**
* &#64;author tigerhhzz
* &#64;date 2022/6/18 10:01
*/
public interface IMessageService
public String send();

业务类–实现类

package com.tigerhhzz.springcloud.service.impl;
import cn.hutool.core.lang.UUID;
import com.tigerhhzz.springcloud.service.IMessageService;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
/**
* &#64;author tigerhhzz
* &#64;date 2022/6/18 10:01
*/
&#64;EnableBinding(Source.class) //定义消息推送通道
public class MessageServiceImpl implements IMessageService
//消息发送通道
&#64;Resource
private MessageChannel output;
&#64;Override
public String send()
String serial &#61; UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("***********serial:"&#43;serial);
return serial;

controller层

package com.tigerhhzz.springcloud.controller;
import com.tigerhhzz.springcloud.service.IMessageService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* &#64;author tigerhhzz
* &#64;date 2022/6/18 10:05
*/
&#64;RestController
public class SendMessageController
&#64;Resource
private IMessageService messageService;
&#64;RequestMapping("/sendMessage")
public String sendMessage()
String send &#61; messageService.send();
return send;

8802基本和生产者8801的module一样

8802的配置&#xff1a;

server:
port: 8802
spring:
application:
name: cloud-stream-message-consumer
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称&#xff0c;用于binding整合
type: rabbit #消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的exchange名称定义
content-type: application/json #设置消息类型&#xff0c;本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/ #http://localhost:7001/eureka/
instance:
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
instance-id: receiver-8802.com
prefer-ip-address: true #访问的路径变为IP地址

8802的控制层&#xff1a;

package com.tigerhhzz.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* &#64;author tigerhhzz
* &#64;date 2022/6/18 10:32
*/
&#64;Component
&#64;EnableBinding(Sink.class)
public class ReceiveMessageListenerController
&#64;Value("$server.port")
private String serverPort;
&#64;StreamListener(Sink.INPUT)
public void input(Message message)
System.out.println("我是消费者8802&#xff0c;-----》接受到的消息是&#xff1a;"&#43;message.getPayload()&#43;"\\t"&#43;serverPort);

依照8802&#xff0c;clone出来一份运行8803


4&#xff0c;测试

依次启动rabbitmq
7001注册中心
8801消息生产模块
8802和8803消息消费模块



eureka7001注册中心

测试&#xff1a; 8801发送两条消息
http://localhost:8801/sendMessage

***********serial:76f1e833-100b-4d30-9423-4de0fad7e71e
***********serial:cfbcbed8-ba27-4036-8c5b-264a44cffea4

8802和8803分别接收到两条消息

我是消费者8802&#xff0c;-----》接受到的消息是&#xff1a;76f1e833-100b-4d30-9423-4de0fad7e71e 8802
我是消费者8802&#xff0c;-----》接受到的消息是&#xff1a;cfbcbed8-ba27-4036-8c5b-264a44cffea4 8802

我是消费者8803&#xff0c;-----》接受到的消息是&#xff1a;76f1e833-100b-4d30-9423-4de0fad7e71e 8803
我是消费者8803&#xff0c;-----》接受到的消息是&#xff1a;cfbcbed8-ba27-4036-8c5b-264a44cffea4 8803

5&#xff0c;分组消费与持久化


微服务应用放置于同一个group中&#xff0c;就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的&#xff0c;同一个组内会发生竞争关系&#xff0c;只有其中一个可以消费。


8803/8802都变成相同组&#xff0c;group两个相同


8802/8803实现了轮询分组&#xff0c;每次只有一个消费者。8801模块的发的消息只能被8802或者8803其中一个接受到&#xff0c;这样避免了重复消费


加上分组group属性&#xff0c;可以实现消息队列的持久化&#xff0c;group属性是持久化消息队列中很重要的一个属性。

rabiitmq界面&#xff1a;


推荐阅读
  • 历经两个月,他成功斩获阿里巴巴Offer
    经过两个月的努力,一位普通的双非本科毕业生最终成功获得了阿里巴巴的录用通知。 ... [详细]
  • 本文详细介绍了Java中实现异步调用的多种方式,包括线程创建、Future接口、CompletableFuture类以及Spring框架的@Async注解。通过代码示例和深入解析,帮助读者理解并掌握这些技术。 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
  • 深入解析SpringMVC核心组件:DispatcherServlet的工作原理
    本文详细探讨了SpringMVC的核心组件——DispatcherServlet的运作机制,旨在帮助有一定Java和Spring基础的开发人员理解HTTP请求是如何被映射到Controller并执行的。文章将解答以下问题:1. HTTP请求如何映射到Controller;2. Controller是如何被执行的。 ... [详细]
  • 深入解析Spring启动过程
    本文详细介绍了Spring框架的启动流程,帮助开发者理解其内部机制。通过具体示例和代码片段,解释了Bean定义、工厂类、读取器以及条件评估等关键概念,使读者能够更全面地掌握Spring的初始化过程。 ... [详细]
  • Spring Boot 中静态资源映射详解
    本文深入探讨了 Spring Boot 如何简化 Web 应用中的静态资源管理,包括默认的静态资源映射规则、WebJars 的使用以及静态首页的处理方法。通过本文,您将了解如何高效地管理和引用静态资源。 ... [详细]
  • 本文探讨了如何在Java中使用JAXB解组两个具有相同名称但不同结构的对象。我们将介绍一个抽象类Bar及其具体实现,并展示如何正确地解析XML文档以获取正确的对象实例。 ... [详细]
  • springMVC JRS303验证 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 近期我们开发了一款包含天气预报功能的万年历应用,为了满足这一需求,团队花费数日时间精心打造并测试了一个稳定可靠的天气API接口,现正式对外开放。 ... [详细]
  • 烤鸭|本文_Spring之Bean的生命周期详解
    烤鸭|本文_Spring之Bean的生命周期详解 ... [详细]
  • 本文介绍了如何利用Java中的URLConnection类来实现基本的网络爬虫功能,包括向目标网站发送请求、接收HTML响应、解析HTML以提取所需信息,并处理可能存在的递归爬取需求。 ... [详细]
  • 利用GitHub热门资源,成功斩获阿里、京东、腾讯三巨头Offer
    Spring框架作为Java生态系统中的重要组成部分,因其强大的功能和灵活的扩展性,被广泛应用于各种规模的企业级应用开发中。本文将通过一份在GitHub上获得极高评价的Spring全家桶文档,探讨如何掌握Spring框架及其相关技术,助力职业发展。 ... [详细]
  • RabbitMQ消息分发策略与确认机制
    本文详细介绍了RabbitMQ的消息分发轮询机制以及消息确认(Message Acknowledgment)功能,通过实例演示了如何确保消息可靠传递。 ... [详细]
  • RabbitMQ 核心组件解析
    本文详细介绍了RabbitMQ的核心概念,包括其基本原理、应用场景及关键组件,如消息、生产者、消费者、信道、交换机、路由键和虚拟主机等。 ... [详细]
author-avatar
大廷705
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有